package com.yifeng.repo.base.utils.async;

import com.talkyun.utils.Looper;
import com.talkyun.utils.LooperPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by daibing on 2020/12/10.
 */
public class AsyncSortedProcessor<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncSortedProcessor.class);
    private final String name;
    private final int threadSize;
    private final ConcurrentMap<Integer, ArrayBlockingQueue<T>> queueMap;
    private LooperPool pool;

    public AsyncSortedProcessor(String name, int threadSize, int bufferSize) {
        this.name = name;
        this.threadSize = threadSize;
        this.queueMap = new ConcurrentHashMap<>(threadSize);
        for (int i = 0; i < threadSize; i++) {
            queueMap.put(i, new ArrayBlockingQueue<T>(bufferSize));
        }
    }

    public void submit(T item) {
        int shardingKey = shardingKey(item) & Integer.MAX_VALUE;
        int key = shardingKey % threadSize;
        ArrayBlockingQueue<T> queue = queueMap.get(key);
        if (queue == null) {
            throw new RuntimeException(String.format("queue is null,shardingKey=%s,threadSize=%s,queueMap=%s", shardingKey, threadSize, queueMap.keySet()));
        }
        while (!queue.offer(item)) {
            T old = queue.poll();
            this.handleLostData(old);
        }
    }

    public void put(T item) {
        int shardingKey = shardingKey(item) & Integer.MAX_VALUE;
        int key = shardingKey % threadSize;
        ArrayBlockingQueue<T> queue = queueMap.get(key);
        if (queue == null) {
            throw new RuntimeException(String.format("queue is null,shardingKey=%s,threadSize=%s,queueMap=%s", shardingKey, threadSize, queueMap.keySet()));
        }   
        try {
            queue.put(item);
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }

    protected int pageSize() {
        // impl by sub class
        return 100;
    }

    protected int queuePollTimeoutMillis(){
        // impl by sub class
        return 1000;
    }

    protected int shardingKey(T item) {
        // impl by sub class, return key's hashcode from item.
        return 0;
    }

    protected void handleLostData(T item) {
        // impl by sub class
    }

    protected String handlePageData(List<T> itemList) {
        // impl by sub class, return valid data count handle by sub class
        return null;
    }

    protected void handlePageErrorData(List<T> itemList, Throwable t) {
        // impl by sub class
    }

    protected void print(int queueSize, int itemsSize, String handleCount) {
        // impl by sub class
    }

    public void startup() {
        LOGGER.info("start async sorted {} processor ... ", name);
        final AtomicInteger index = new AtomicInteger(0);
        final int pageSize = pageSize();
        final int queuePollTimeoutMillis = queuePollTimeoutMillis();
        pool = new LooperPool(threadSize, new LooperPool.LooperBuilder() {
            @Override
            public Looper build() {
                return new Looper(name, 50, 1500) {
                    private final BlockingQueue<T> queue = queueMap.get(index.getAndIncrement());
                    private List<T> itemList;

                    @Override
                    protected void loop() throws Throwable {
                        if (itemList == null || itemList.isEmpty()) {
                            itemList = listByPageSize(queue, pageSize, queuePollTimeoutMillis);
                        }
                        String count = handlePageData(itemList);
                        print(queue.size(), itemList.size(), count);
                        itemList.clear();
                    }

                    @Override
                    protected void loopThrowable(Throwable t) {
                        handlePageErrorData(itemList, t);
                        itemList.clear();
                    }
                };
            }
        });
        pool.startup();
    }

    public void shutdown() {
        if (pool != null) {
            pool.shutdown();
        }
    }

    private List<T> listByPageSize(BlockingQueue<T> queue, int pageSize, int queuePollTimeoutMillis) throws InterruptedException {
        List<T> items = new ArrayList<>(pageSize);
        for (int i = 0; i < pageSize; i++) {
            T item = queue.poll(queuePollTimeoutMillis, TimeUnit.MILLISECONDS);
            if (item == null) {
                break;
            }
            items.add(item);
        }
        return items;
    }
}
